In [1]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

1.3. BigQuery Storage & Spark MLlib - Python

Use the BigQuery storage connector and Spark MLlib to build a Linear Regression model and make predictions

Create Dataproc Cluster with Jupyter

This notebook is designed to be run on Google Cloud Dataproc.

Follow the links below for instructions on how to create a Dataproc Cluster with the Juypter component installed.

Python 3 Kernel

Use a Python 3 kernel (not PySpark) to allow you to configure the SparkSession in the notebook and include the spark-bigquery-connector required to use the BigQuery Storage API.

Scala Version

Check what version of Scala you are running so you can include the correct spark-bigquery-connector jar


In [2]:
!scala -version


cat: /release: No such file or directory
Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL

Create Spark Session

Include the correct version of the spark-bigquery-connector jar

If you are using scala version 2.11 use 'gs://spark-lib/bigquery/spark-bigquery-latest.jar'.

If you are using scala version 2.12 use 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'.


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName('1.3. BigQuery Storage &  Spark MLlib - Python')\
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar') \
  .getOrCreate()

spark.version


Out[3]:
'2.4.5'

Enable repl.eagerEval

This will output the results of DataFrames in each step without the new need to show df.show() and also improves the formatting of the output


In [4]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Read the data from BigQuery as a Spark Dataframe


In [5]:
table  = 'bigquery-public-data.samples.natality'

df_natality_table = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .load()

Limit no of rows and cache data

limit the no of rows that will be read for this example to run faster.

The DataFrame is cached as LinearRegression is iterative and this avoids re-reading the data from BigQuery Storage for each iteration


In [6]:
limit = 10000

df_natality_select = df_natality_table \
.select("weight_pounds", "mother_age", "father_age", "gestation_weeks", "weight_gain_pounds", "apgar_5min") \
.where("""
weight_pounds IS NOT NULL 
AND mother_age IS NOT NULL
AND father_age IS NOT NULL
AND gestation_weeks IS NOT NULL
AND weight_gain_pounds IS NOT NULL
AND apgar_5min IS NOT NULL
""") \
.limit(limit) \
.cache()

df_natality_select.printSchema()


root
 |-- weight_pounds: double (nullable = true)
 |-- mother_age: long (nullable = true)
 |-- father_age: long (nullable = true)
 |-- gestation_weeks: long (nullable = true)
 |-- weight_gain_pounds: long (nullable = true)
 |-- apgar_5min: long (nullable = true)

Optional

Run count to check no of rows in DataFrame


In [7]:
df_natality_select.count()


Out[7]:
10000

Create an input DataFrame for Spark MLlib using VectorAssembler

Spark MLlib estimators expect a single vector column for features. Multiple columns can be converted to a single vector column using VectorAssembler


In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["mother_age", "father_age", "gestation_weeks", "weight_gain_pounds", "apgar_5min"],
    outputCol="features")

df_assembler_output = assembler.transform(df_natality_select)
df_assembler_output


Out[8]:
weight_poundsmother_agefather_agegestation_weeksweight_gain_poundsapgar_5minfeatures
6.3713593718313437309[31.0,34.0,37.0,3...
6.393405598363538999[36.0,35.0,38.0,9...
6.21262654316343437569[34.0,34.0,37.0,5...
9.67608867918314937998[31.0,49.0,37.0,9...
6.3382900325434734189[43.0,47.0,34.0,1...
7.31934709844346449999[43.0,46.0,44.0,9...
7.061406251862930409999[29.0,30.0,40.0,9...
6.208217297923534399999[35.0,34.0,39.0,9...
7.84845652723945402310[39.0,45.0,40.0,2...
5.8312268299414338359[41.0,43.0,38.0,3...
8.126238977324245391410[42.0,45.0,39.0,1...
7.87491199864414238309[41.0,42.0,38.0,3...
9.06320359082333738399[33.0,37.0,38.0,3...
7.25100379718333036119[33.0,30.0,36.0,1...
6.4374980503999994303339999[30.0,33.0,39.0,9...
9.2153225516394138179[39.0,41.0,38.0,1...
7.3744626639319941209[31.0,99.0,41.0,2...
5.74965579296354030338[35.0,40.0,30.0,3...
4.4423145793434737308[43.0,47.0,37.0,3...
3.725812227837443319[37.0,44.0,33.0,1...
only showing top 20 rows

Create a training data DataFrame

Create a training data DataFrame with just the features and label column.

Cache the training data table.


In [9]:
df_training_data = df_assembler_output \
.select("features", "weight_pounds") \
.withColumnRenamed("weight_pounds","label")

df_training_data.cache()
df_training_data


Out[9]:
featureslabel
[31.0,34.0,37.0,3...6.3713593718
[36.0,35.0,38.0,9...6.393405598
[34.0,34.0,37.0,5...6.21262654316
[31.0,49.0,37.0,9...9.67608867918
[43.0,47.0,34.0,1...6.3382900325
[43.0,46.0,44.0,9...7.3193470984
[29.0,30.0,40.0,9...7.06140625186
[35.0,34.0,39.0,9...6.20821729792
[39.0,45.0,40.0,2...7.8484565272
[41.0,43.0,38.0,3...5.8312268299
[42.0,45.0,39.0,1...8.12623897732
[41.0,42.0,38.0,3...7.87491199864
[33.0,37.0,38.0,3...9.06320359082
[33.0,30.0,36.0,1...7.25100379718
[30.0,33.0,39.0,9...6.4374980503999994
[39.0,41.0,38.0,1...9.2153225516
[31.0,99.0,41.0,2...7.3744626639
[35.0,40.0,30.0,3...5.74965579296
[43.0,47.0,37.0,3...4.4423145793
[37.0,44.0,33.0,1...3.7258122278
only showing top 20 rows

Split the data into training and test sets

30% held out for testing


In [10]:
(df_training, df_test) = df_training_data.randomSplit([0.7, 0.3])

Construct a new LinearRegression object and fit the training data

Import and use and the LinearRegression model


In [11]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")

model = lr.fit(df_training)

In [12]:
print("Coefficients:" + str(model.coefficients))
print("Intercept:" + str(model.intercept))


Coefficients:[0.024976900994897657,-0.00405761639159605,0.27142700913671225,-0.00020640385689556944,0.0008561742907835546]
Intercept:-3.8299366375006008

Summarize the model over the training data and print metrics


In [13]:
trainingSummary = model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

trainingSummary.residuals


numIterations: 1
objectiveHistory: [0.0]
RMSE: 1.205822
r2: 0.391391
Out[13]:
residuals
-0.06427120097813521
1.4054942809910722
1.615669149519861
1.8461326439359498
0.07032860053297885
-1.43244194979542
-1.7109598506752508
2.995209709060892
0.21341987241150395
1.267985188300587
1.029342775083852
-0.724076824537927
0.6977075173507492
-0.10059277216248841
-2.514989420406767
0.4217857187789251
-0.28999264665870594
0.6565114778614269
0.16326074403054758
-0.09278719428170223
only showing top 20 rows

Make predictions on test data


In [15]:
predictions = model.transform(df_test)
predictions.select("prediction", "label", "features")


Out[15]:
predictionlabelfeatures
6.7353312811743196.25671899556[13.0,17.0,38.0,6...
7.3158887788238528.437090766739999[14.0,16.0,40.0,1...
7.3721681361942367.50012615324[14.0,17.0,40.0,9...
6.74008558257555.37486994756[14.0,20.0,38.0,9...
5.6823085463997985.8753192823[14.0,99.0,35.0,9...
4.330334634123183.0622208191799998[15.0,18.0,29.0,9...
7.1346638546452449.06320359082[15.0,18.0,39.0,3...
6.586100989125068.000575487979999[15.0,19.0,37.0,4...
7.3079165018438236.4374980503999994[15.0,20.0,40.0,9...
6.538904398784065.8753192823[15.0,99.0,38.0,1...
7.2719711040225638.24969784404[15.0,99.0,41.0,3...
7.8886089056193816.37576861704[15.0,99.0,43.0,5...
5.9919888227215448.09316963802[16.0,16.0,35.0,9...
5.7122233257393175.8135898489399995[16.0,17.0,34.0,9...
7.3433539034319426.87401332916[16.0,17.0,40.0,9...
7.70324992859483359.25059651352[16.0,17.0,41.0,5...
7.15881514021255955.93704871566[16.0,18.0,39.0,4...
7.15881514021255956.56316153974[16.0,18.0,39.0,4...
7.06958162648526.75055446244[16.0,18.0,39.0,9...
6.2700562925241664.9383546688[16.0,19.0,36.0,1...
only showing top 20 rows

Select (prediction, true label) and compute test error


In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 1.18882